/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.hive.impl;

import com.chinamobile.cmss.lakehouse.common.dto.TableNameDto;
import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.core.handler.K8sUriHandler;
import com.chinamobile.cmss.lakehouse.dao.entity.MetadataDatabaseEntity;
import com.chinamobile.cmss.lakehouse.service.hive.HiveAccessService;
import com.chinamobile.cmss.lakehouse.service.hive.HiveConnectionPool;
import com.chinamobile.cmss.lakehouse.service.hive.HiveSqlExecutor;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class HiveAccessServiceImpl implements HiveAccessService {

    private final HiveConnectionPool hiveConnectionPool;

    @Autowired
    public HiveAccessServiceImpl(HiveConnectionPool hiveConnectionPool) {
        this.hiveConnectionPool = hiveConnectionPool;
    }

    @Autowired
    private K8sUriHandler k8sUriHandler;

    @Override
    public <T> T loanHiveMetaStoreClient(String userName, FunctionWithException<HiveMetaStoreClient, T> loan) {
        try {
            HiveConf hiveConf = new HiveConf();
            hiveConf.set("hive.metastore.uris", k8sUriHandler.getExternalMetaUrl());
            long start = System.currentTimeMillis();
            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(userName);
            return ugi.doAs((PrivilegedExceptionAction<T>) () -> {
                try (HiveMetaStoreClient hiveMetaClient = new HiveMetaStoreClient(hiveConf)) {
                    long duration = System.currentTimeMillis() - start;
                    log.info("connection hiveMeatStore time: {}", duration / 1000 + "s");
                    return loan.apply(hiveMetaClient);
                }
            });
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void createDb(MetadataDatabaseEntity database, String username) {
        try {
            HiveSqlExecutor hiveSqlExecutor = getHiveSqlExecutor(username);
            String sql = "create database if not exists %s";
            sql = String.format(sql, database.getName());
            hiveSqlExecutor.execute(sql);
        } catch (Exception e) {
            log.error("create hive db failed.", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteDb(MetadataDatabaseEntity database, String username) {
        try {
            HiveSqlExecutor hiveSqlExecutor = getHiveSqlExecutor(username);
            String sql = "drop database if exists %s";
            sql = String.format(sql, database.getName());
            hiveSqlExecutor.execute(sql);
        } catch (Exception e) {
            log.error("drop hive db failed.", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public HiveSqlExecutor getHiveSqlExecutor(String userName) {
        return new HiveSqlExecutor(hiveConnectionPool, userName);
    }

    @Override
    public boolean isDatabaseExist(String databaseName) {
        return true;
    }

    @Override
    public boolean isTableExist(String database, String table, String userName) {
        return loanHiveMetaStoreClient(userName, client -> client.tableExists(database, table, userName));
    }

    @Override
    public void accessPermissionCheck(String userName, TableNameDto tableNameDto) {
        Set<String> userDatabases = getDatabasesByUser(userName);
        if (!userDatabases.contains(tableNameDto.getDatabaseName())) {
            throw new BaseException("user " + userName
                + " does has permission access " + tableNameDto + " or " + tableNameDto + " does not exist");
        }
    }

    @Override
    public List<Table> getTables(String dataBaseName, String userName) {
        return loanHiveMetaStoreClient(userName, client -> {
            long start = System.currentTimeMillis();
            List<String> tableName = new ArrayList<>();

            try {
                tableName = client.getAllTables(dataBaseName);
            } catch (Exception e) {
                log.info("get all tables error:", e.getMessage());
            }
            List<Table> tables = new ArrayList<>(tableName.size());
            for (String tn : tableName) {
                try {
                    tables.add(client.getTable(dataBaseName, tn));
                } catch (Exception e) {
                    log.info("get " + dataBaseName + "table error:", e.getMessage());
                }
            }
            long duration = System.currentTimeMillis() - start;
            log.info("get {} tables cost {} time", tables.size(), duration / 1000 + "s");
            return tables;
        });
    }

    @Override
    public Table getTable(String databaseName, String tableName, String userName) {
        return loanHiveMetaStoreClient(userName, client -> {
            try {
                long start = System.currentTimeMillis();
                final Table table = client.getTable(databaseName, tableName);
                long duration = System.currentTimeMillis() - start;
                log.info("get {}.{} detail cost {} time", databaseName, tableName, duration);
                return table;
            } catch (NoSuchObjectException ignore) {
                return null;
            }
        });
    }

    @Override
    public void deleteTable(String userName, TableNameDto tableNameDto) {
        try (HiveSqlExecutor hiveSqlExecutor = getHiveSqlExecutor(userName)) {
            hiveSqlExecutor.execute("DROP table " + tableNameDto.toString());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Set<String> getDatabasesByUser(String userName) {
        try (HiveSqlExecutor hiveSqlExecutor = getHiveSqlExecutor(userName)) {
            Optional<Set<String>> resultSetOpt = hiveSqlExecutor.execute("show databases", (Function<ResultSet, Set<String>>) rs -> {
                Set<String> set = new HashSet<>();
                try {
                    while (rs.next()) {
                        set.add(rs.getString(1));
                    }
                } catch (SQLException sqlException) {
                    throw new RuntimeException(sqlException);
                }
                return set;
            });
            return resultSetOpt.orElseGet(Collections::emptySet);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean executeSql(String userName, String sql) {
        return loanHiveMetaStoreClient(userName, client -> {
            try {
                HiveSqlExecutor hiveSqlExecutor = getHiveSqlExecutor(userName);
                hiveSqlExecutor.execute(sql);
                return true;
            } catch (Exception e) {
                log.error("execute hive sql[{}] failed", sql, e);
                throw new RuntimeException(e);
            }
        });
    }
}
